home *** CD-ROM | disk | FTP | other *** search
- #!/usr/bin/python
- # Copyright (c) 2005-2009 Canonical Ltd
- #
- # AUTHOR:
- # Michael Vogt <mvo@ubuntu.com>
- #
- # This file is part of unattended-upgrades
- #
- # unattended-upgrades is free software; you can redistribute it and/or
- # modify it under the terms of the GNU General Public License as published
- # by the Free Software Foundation; either version 2 of the License, or (at
- # your option) any later version.
- #
- # unattended-upgrades is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- # General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with unattended-upgrades; if not, write to the Free Software
- # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- #
-
- import apt_inst
- import apt_pkg
-
- import sys
- import os
- import string
- import datetime
- import ConfigParser
-
- from StringIO import StringIO
- from optparse import OptionParser
- from subprocess import Popen, PIPE
-
- import warnings
- warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning)
- import apt
- import logging
- import subprocess
-
- import gettext
- from gettext import gettext as _
-
- class MyCache(apt.Cache):
- def __init__(self):
- apt.Cache.__init__(self)
- def clear(self):
- self._depcache.Init()
- assert (self._depcache.InstCount == 0 and
- self._depcache.BrokenCount == 0 and
- self._depcache.DelCount == 0)
-
-
- def is_allowed_origin(pkg, allowed_origins):
- if not pkg.candidate:
- return False
- for origin in pkg.candidate.origins:
- for allowed in allowed_origins:
- if origin.origin == allowed[0] and origin.archive == allowed[1]:
- return True
- return False
-
- def check_changes_for_sanity(cache, allowed_origins, blacklist):
- if cache._depcache.BrokenCount != 0:
- return False
- for pkg in cache:
- if pkg.markedDelete:
- logging.debug("pkg '%s' now marked delete" % pkg.name)
- return False
- if pkg.markedInstall or pkg.markedUpgrade:
- if not is_allowed_origin(pkg, allowed_origins):
- logging.debug("pkg '%s' not in allowed origin" % pkg.name)
- return False
- if pkg.name in blacklist:
- logging.debug("pkg '%s' blacklisted" % pkg.name)
- return False
- if pkg._pkg.SelectedState == apt_pkg.SelStateHold:
- logging.debug("pkg '%s' is on hold" % pkg.name)
- return False
- return True
-
- def pkgname_from_deb(debfile):
- # FIXME: add error checking here
- try:
- control = apt_inst.debExtractControl(open(debfile))
- sections = apt_pkg.ParseSection(control)
- return sections["Package"]
- except (IOError, SystemError), e:
- logging.error("failed to read deb file '%s' (%s)" % (debfile, e))
- # dumb fallback
- return debfile.split("_")[0]
-
- def conffile_prompt(destFile):
- logging.debug("check_conffile_prompt('%s')" % destFile)
- pkgname = pkgname_from_deb(destFile)
- status_file = apt_pkg.Config.Find("Dir::State::status")
- parse = apt_pkg.ParseTagFile(open(status_file,"r"))
- while parse.Step() == 1:
- if parse.Section.get("Package") == pkgname:
- logging.debug("found pkg: %s" % pkgname)
- if parse.Section.has_key("Conffiles"):
- conffiles = parse.Section.get("Conffiles")
- # Conffiles:
- # /etc/bash_completion.d/m-a c7780fab6b14d75ca54e11e992a6c11c
- for line in string.split(conffiles,"\n"):
- logging.debug("conffile line: %s", line)
- l = string.split(string.strip(line))
- file = l[0]
- md5 = l[1]
- if len(l) > 2:
- obs = l[2]
- else:
- obs = None
- if os.path.exists(file) and obs != "obsolete":
- current_md5 = apt_pkg.md5sum(open(file).read())
- if current_md5 != md5:
- return True
- return False
-
-
- def dpkg_conffile_prompt():
- if not apt_pkg.Config.has_key("DPkg::Options"):
- return True
- options = apt_pkg.Config.ValueList("DPkg::Options")
- for option in map(string.strip, options):
- if (option == "--force-confold" or
- option == "--force-confnew"):
- return False
- return True
-
- def rewind_cache(cache, pkgs_to_upgrade):
- " set the cache back to the state with packages_to_upgrade "
- cache.clear()
- for pkg2 in pkgs_to_upgrade:
- pkg2.markUpgrade()
-
- def host():
- return os.uname()[1]
-
- # *sigh* textwrap is nice, but it breaks "linux-image" into two
- # seperate lines
- def wrap(t, width=70, subsequent_indent=""):
- out = ""
- for s in t.split():
- if (len(out)-out.rfind("\n")) + len(s) > width:
- out += "\n" + subsequent_indent
- out += s + " "
- return out
-
- def setup_apt_listchanges():
- " deal with apt-listchanges "
- conf = "/etc/apt/listchanges.conf"
- if os.path.exists(conf):
- # check if mail is used by apt-listchanges
- cf = ConfigParser.ConfigParser()
- cf.read(conf)
- if cf.has_section("apt") and cf.has_option("apt","frontend"):
- frontend = cf.get("apt","frontend")
- if frontend == "mail" and os.path.exists("/usr/sbin/sendmail"):
- # mail frontend and sendmail, we are fine
- logging.debug("apt-listchanges is set to mail frontend, ignoring")
- return
- # setup env (to play it safe) and return
- os.putenv("APT_LISTCHANGES_FRONTEND","none");
-
- def send_summary_mail(pkgs, res, pkgs_kept_back, mem_log, logfile_dpkg):
- " send mail (if configured in Unattended-Upgrades::Mail) "
- email = apt_pkg.Config.Find("Unattended-Upgrade::Mail", "")
- if not email:
- return
- if not os.path.exists("/usr/bin/mail"):
- logging.error(_("No '/usr/bin/mail', can not send mail. "
- "You probably want to install the 'mailx' package."))
- return
- logging.debug("Sending mail with '%s' to '%s'" % (logfile_dpkg, email))
- mail = subprocess.Popen(["/usr/bin/mail",
- "-s", _("unattended-upgrades result "
- "for '%s'") % host(),
- email],
- stdin=subprocess.PIPE)
- s = _("Unattended upgrade returned: %s\n\n") % res
- s += _("Packages that are upgraded:\n")
- s += " " + wrap(pkgs, 70, " ")
- s += "\n"
- if pkgs_kept_back:
- s += _("Packages with upgradable origin but kept back:\n")
- s += " " + wrap(" ".join(pkgs_kept_back), 70, " ")
- s += "\n"
- s += "\n"
- s += _("Package installation log:")+"\n"
- s += open(logfile_dpkg).read()
- s += "\n\n"
- s += _("Unattended-upgrades log:\n")
- s += mem_log.getvalue()
- mail.stdin.write(s)
- mail.stdin.close()
- ret = mail.wait()
- logging.debug("mail returned: %s" % ret)
-
-
- def main():
- # init the options
- parser = OptionParser()
- parser.add_option("-d", "--debug",
- action="store_true", dest="debug", default=False,
- help=_("print debug messages"))
- parser.add_option("", "--dry-run",
- action="store_true", default=False,
- help=_("Simulation, download but do not install"))
- (options, args) = parser.parse_args()
-
- # setup logging
- logger = logging.getLogger()
- mem_log = StringIO()
- if options.debug:
- logger.setLevel(logging.DEBUG)
- stderr_handler = logging.StreamHandler()
- logger.addHandler(stderr_handler)
- if apt_pkg.Config.Find("Unattended-Upgrade::Mail", ""):
- mem_log_handler = logging.StreamHandler(mem_log)
- logger.addHandler(mem_log_handler)
-
- # format (origin, archive), e.g. ("Ubuntu","dapper-security")
- allowed_origins = map(string.split, apt_pkg.Config.ValueList("Unattended-Upgrade::Allowed-Origins"))
-
- # pkgs that are (for some reason) not save to install
- blacklisted_pkgs = apt_pkg.Config.ValueList("Unattended-Upgrade::Package-Blacklist")
- logging.info(_("Initial blacklisted packages: %s"), " ".join(blacklisted_pkgs))
- logging.info(_("Starting unattended upgrades script"))
-
- # display available origin
- logging.info(_("Allowed origins are: %s") % map(str,allowed_origins))
-
- # check and get lock
- try:
- apt_pkg.PkgSystemLock()
- except SystemError, e:
- logging.error(_("Lock could not be acquired (another package "
- "manager running?)"))
- print _("Cache lock can not be acquired, exiting")
- sys.exit(1)
-
- # get a cache
- cache = MyCache()
- if cache._depcache.BrokenCount > 0:
- print _("Cache has broken packages, exiting")
- logging.error(_("Cache has broken packages, exiting"))
- sys.exit(1)
- # speed things up with latest apt
- actiongroup = apt_pkg.GetPkgActionGroup(cache._depcache)
-
- # find out about the packages that are upgradable (in a allowed_origin)
- pkgs_to_upgrade = []
- pkgs_kept_back = []
- for pkg in cache:
- if options.debug and pkg.isUpgradable:
- logging.debug("Checking: %s (%s)" % (pkg.name, map(str, pkg.candidate.origins)))
- if (pkg.isUpgradable and
- is_allowed_origin(pkg,allowed_origins)):
- try:
- pkg.markUpgrade()
- if check_changes_for_sanity(cache, allowed_origins,
- blacklisted_pkgs):
- pkgs_to_upgrade.append(pkg)
- else:
- logging.debug("sanity check failed")
- rewind_cache(cache, pkgs_to_upgrade)
- pkgs_kept_back.append(pkg.name)
- except SystemError, e:
- # can't upgrade
- logging.warning(_("package '%s' upgradable but fails to be marked for upgrade (%s)") % (pkg.name, e))
- rewind_cache(cache, pkgs_to_ugprade)
- pkgs_kept_back.append(pkg.name)
-
-
- pkgs = "\n".join([pkg.name for pkg in pkgs_to_upgrade])
- logging.debug("pkgs that look like they should be upgraded: %s" % pkgs)
-
- # download what looks good
- if options.debug:
- fetcher = apt_pkg.GetAcquire(apt.progress.TextFetchProgress())
- else:
- fetcher = apt_pkg.GetAcquire()
- list = apt_pkg.GetPkgSourceList()
- list.ReadMainList()
- recs = cache._records
- pm = apt_pkg.GetPackageManager(cache._depcache)
- try:
- pm.GetArchives(fetcher,list,recs)
- except SystemError, e:
- logging.error(_("GetArchives() failed: '%s'") % e)
- res = fetcher.Run()
-
- if dpkg_conffile_prompt():
- # now check the downloaded debs for conffile conflicts and build
- # a blacklist
- for item in fetcher.Items:
- logging.debug("%s" % item)
- if item.Status == item.StatError:
- print _("An error ocured: '%s'") % item.ErrorText
- logging.error(_("An error ocured: '%s'") % item.ErrorText)
- if item.Complete == False:
- print _("The URI '%s' failed to download, aborting") % item.DescURI
- logging.error(_("The URI '%s' failed to download, aborting") % item.DescURI)
- sys.exit(1)
- if not os.path.exists(item.DestFile):
- print _("Download finished, but file '%s' not there?!?" % item.DestFile)
- logging.error("Download finished, but file '%s' not there?!?" % item.DestFile)
- sys.exit(1)
- if item.IsTrusted == False:
- blacklisted_pkgs.append(pkgname_from_deb(item.DestFile))
- if conffile_prompt(item.DestFile):
- # FIXME: skip package (means to re-run the whole marking again
- # and making sure that the package will not be pulled in by
- # some other package again!
- logging.warning(_("Package '%s' has conffile prompt and needs to be upgraded manually") % pkgname_from_deb(item.DestFile))
- blacklisted_pkgs.append(pkgname_from_deb(item.DestFile))
- pkgs_kept_back.append(pkgname_from_deb(item.DestFile))
-
-
- # redo the selection about the packages to upgrade based on the new
- # blacklist
- logging.debug("blacklist: %s" % blacklisted_pkgs)
- # find out about the packages that are upgradable (in a allowed_origin)
- if len(blacklisted_pkgs) > 0:
- cache.clear()
- old_pkgs_to_upgrade = pkgs_to_upgrade[:]
- pkgs_to_upgrade = []
- for pkg in old_pkgs_to_upgrade:
- logging.debug("Checking (blacklist): %s" % (pkg.name))
- pkg.markUpgrade()
- if check_changes_for_sanity(cache, allowed_origins,
- blacklisted_pkgs):
- pkgs_to_upgrade.append(pkg)
- else:
- if not (pkg.name in pkgs_kept_back):
- pkgs_kept_back.append(pkg.name)
- logging.info(_("package '%s' not upgraded") % pkg.name)
- cache.clear()
- for pkg2 in pkgs_to_upgrade:
- pkg2.markUpgrade()
- else:
- logging.debug("dpkg is configured not to cause conffile prompts")
-
- logging.debug("InstCount=%i DelCount=%i BrokenCout=%i" % (cache._depcache.InstCount, cache._depcache.DelCount, cache._depcache.BrokenCount))
-
- # exit if there is nothing to do and nothing to report
- if (len(pkgs_to_upgrade) == 0) and (len(pkgs_kept_back) == 0):
- logging.info(_("No packages found that can be upgraded unattended"))
- sys.exit(0)
-
- # check if we are in dry-run mode
- if options.dry_run:
- logging.info("Option --dry-run given, *not* performing real actions")
- apt_pkg.Config.Set("Debug::pkgDPkgPM","1")
-
- # do the install based on the new list of pkgs
- pkgs = " ".join([pkg.name for pkg in pkgs_to_upgrade])
- logging.info(_("Packages that are upgraded: %s" % pkgs))
-
- # set debconf to NON_INTERACTIVE, redirect output
- os.putenv("DEBIAN_FRONTEND","noninteractive");
- setup_apt_listchanges()
-
- # redirect to log
- REDIRECT_INPUT = os.devnull
- fd = os.open(REDIRECT_INPUT, os.O_RDWR)
- os.dup2(fd,0)
-
- now = datetime.datetime.now()
- logfile_dpkg = logdir+'unattended-upgrades-dpkg_%s.log' % now.isoformat('_')
- logging.info(_("Writing dpkg log to '%s'") % logfile_dpkg)
- fd = os.open(logfile_dpkg, os.O_RDWR|os.O_CREAT, 0644)
- old_stdout = os.dup(1)
- old_stderr = os.dup(2)
- os.dup2(fd,1)
- os.dup2(fd,2)
-
- # create a new package-manager. the blacklist may have changed
- # the markings in the depcache
- pm = apt_pkg.GetPackageManager(cache._depcache)
- if not pm.GetArchives(fetcher,list,recs):
- logging.error(_("pm.GetArchives() failed"))
- # run the fetcher again (otherwise local file://
- # URIs are unhappy (see LP: #56832)
- res = fetcher.Run()
- # unlock the cache
- try:
- apt_pkg.PkgSystemUnLock()
- except SystemError, e:
- pass
- # lock for the shutdown check - its fine if the system
- # is shutdown while downloading but not so much while installing
- apt_pkg.GetLock("/var/run/unattended-upgrades.lock")
- # now do the actual install
- error = None
- try:
- res = pm.DoInstall()
- except SystemError,e:
- error = e
- res = pm.ResultFailed
- finally:
- os.dup2(old_stdout, 1)
- os.dup2(old_stderr, 2)
-
- if res == pm.ResultFailed:
- logging.error(_("Installing the upgrades failed!"))
- logging.error(_("error message: '%s'") % e)
- logging.error(_("dpkg returned a error! See '%s' for details") % logfile_dpkg)
- else:
- logging.info(_("All upgrades installed"))
-
- # send a mail (if needed)
- pkg_install_success = (res != pm.ResultFailed)
- send_summary_mail(pkgs, pkg_install_success, pkgs_kept_back, mem_log, logfile_dpkg)
-
- # auto-reboot (if required and the config for this is set
- if (apt_pkg.Config.FindB("Unattended-Upgrade::Automatic-Reboot", False) and
- os.path.exists("/var/run/reboot-required")):
- logging.warning("Found /var/run/reboot-required, rebooting")
- subprocess.call(["/sbin/reboot"])
-
-
- if __name__ == "__main__":
- localesApp="unattended-upgrades"
- localesDir="/usr/share/locale"
- gettext.bindtextdomain(localesApp, localesDir)
- gettext.textdomain(localesApp)
-
- if os.getuid() != 0:
- print _("You need to be root to run this application")
- sys.exit(1)
-
- if not os.path.exists("/var/log/unattended-upgrades"):
- os.makedirs("/var/log/unattended-upgrades")
-
- # init the logging
- logdir = apt_pkg.Config.FindDir("APT::UnattendedUpgrades::LogDir",
- "/var/log/unattended-upgrades/")
- logfile = logdir+apt_pkg.Config.Find("APT::UnattendedUpgrades::LogFile",
- "unattended-upgrades.log")
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s %(levelname)s %(message)s',
- filename=logfile)
- # run the main code
- main()
-